#include "config.h"

#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "net_table.h"
#include "net_global.h"
#include "sdevent.h"
#include "sysy_lib.h"
#include "timer.h"
#include "ylock.h"
#include "bh.h"
#include "chunk_cleanup.h"
#include "stor_rpc.h"
#include "md_map.h"
#include "md_root.h"
#include "adt.h"
#include "fence.h"
#include "volume.h"
#include "cluster.h"
#include "dbg.h"

/**
 * @file 离线消息处理
 *
 * 按nid组织，目前主要用于主动回收chunk，
 * 还需要一般的chunk gc机制
 */

static hashtable_t __chunk_cleanup__;
static sy_rwlock_t __lock__;

typedef struct {
        fileid_t parent;
        chkid_t chkid;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];
} cleanup_msg_t;

typedef struct {
        struct list_head hook;
        nid_t nid;
        cleanup_msg_t msg;
} msglist_t;

typedef struct {
        sem_t sem;
        sy_spinlock_t lock;
        struct list_head list;   ///< list of msglist_t
} commit_worker_t;

typedef struct {
        nid_t nid;
        int running;
        msgqueue_t msgqueue;     ///< persistence to local file
} entry_t;

static commit_worker_t *__commit_worker__;
static worker_handler_t __chunk_cleanup_timer__;

#define CHUNK_CLEANUP_MSG_SIZE (sizeof(cleanup_msg_t) + sizeof(msgqueue_msg_t))

static int __chunk_cleanup_msgload(entry_t *ent, const nid_t *nid)
{
        int ret;
        char home[MAX_PATH_LEN];

        snprintf(home, MAX_LINE_LEN, "%s/cleanup/"NID_FORMAT,
                 ng.home, NID_ARG(nid));

        DINFO("load msg queue %s\n", home);

        ret = msgqueue_load(&ent->msgqueue, home,
                        CHUNK_CLEANUP_MSG_SIZE,
                        gloconf.chunk_cleanup_msgqueue_size,
                        1);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = msgqueue_init(&ent->msgqueue, home,
                                        CHUNK_CLEANUP_MSG_SIZE,
                                        gloconf.chunk_cleanup_msgqueue_size,
                                        1);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_cleanup_find(const nid_t *nid, entry_t **_ent)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(__chunk_cleanup__, (void *)nid);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        *_ent = ent;

        return 0;
err_ret:
        return ret;
}


static int __chunk_cleanup_create(const nid_t *nid)
{
        int ret;
        entry_t *ent;

        ret = ymalloc((void **)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->running = 0;
        ent->nid = *nid;

        ret = __chunk_cleanup_msgload(ent, nid);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = sy_rwlock_wrlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_free1, ret);

        ret = hash_table_insert(__chunk_cleanup__, (void *)ent, (void *)nid, 0);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&__lock__);

        return 0;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_free1:
        msgqueue_close(&ent->msgqueue);
err_free:
        yfree((void **)&ent);
err_ret:
        return ret;
}

#if 0
int chunk_cleanup_msgget(const nid_t *nid, void *buf, int len)
{
        int ret;
        entry_t *ent;

retry:
        ret = sy_rwlock_rdlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __chunk_cleanup_find(nid, &ent);
        if (unlikely(ret)) {
                sy_rwlock_unlock(&__lock__);
                ret = __chunk_cleanup_create(nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("retry "NID_FORMAT"\n", NID_ARG(nid));
                goto retry;
        }

        ret = msgqueue_get(&ent->msgqueue, buf, len);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&__lock__);

        return ret;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_ret:
        return -ret;
}

int chunk_cleanup_msgpop(const nid_t *nid, void *buf, int len)
{
        int ret;
        entry_t *ent;

retry:
        ret = sy_rwlock_rdlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __chunk_cleanup_find(nid, &ent);
        if (unlikely(ret)) {
                sy_rwlock_unlock(&__lock__);
                ret = __chunk_cleanup_create(nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("retry "NID_FORMAT"\n", NID_ARG(nid));
                goto retry;
        }

        ret = msgqueue_pop(&ent->msgqueue, buf, len);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&__lock__);

        return ret;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_ret:
        return -ret;
}

#endif


/**
 * 离线消息队列，按nid组织
 *
 * @param pool
 * @param parent
 * @param chkid
 * @param nid
 * @param meta_version
 * @return
 */
int chunk_cleanup_push(const char *pool, const fileid_t *parent,
                       const chkid_t *chkid, const nid_t *nid, uint64_t meta_version)
{
        int ret;
        commit_worker_t *commit_worker = __commit_worker__;
        msglist_t *ent;
        cleanup_msg_t *msg;

        YASSERT(parent->type == __POOL_CHUNK__
                || parent->type == __VOLUME_CHUNK__);
        YASSERT(nid->id > 0);
        
        ret = ymalloc((void **)&ent, sizeof(*ent));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->nid = *nid;
        msg = &ent->msg;
        strcpy(msg->pool, pool);
        msg->parent = *parent;
        msg->chkid = *chkid;
        msg->meta_version = meta_version;

        ret = sy_spin_lock(&commit_worker->lock);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        list_add_tail(&ent->hook, &commit_worker->list);

        sy_spin_unlock(&commit_worker->lock);

        sem_post(&commit_worker->sem);

        DBUG("cleanup "CHKID_FORMAT" of %s\n", CHKID_ARG(chkid), network_rname(nid));
        
        return 0;
err_free:
        yfree((void **)&ent);
err_ret:
        return ret;
}

STATIC int __chunk_cleanup_push(msglist_t *msglist)
{
        int ret, retry = 0;
        entry_t *ent;
        cleanup_msg_t *msg = &msglist->msg;
        nid_t *nid = &msglist->nid;

        DBUG("cleanup "CHKID_FORMAT" of %s\n", CHKID_ARG(&msg->chkid), network_rname(nid));
        
retry:
        ret = sy_rwlock_rdlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __chunk_cleanup_find(nid, &ent);
        if (unlikely(ret)) {
                sy_rwlock_unlock(&__lock__);
                ret = __chunk_cleanup_create(nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                retry++;
                if (retry > 1)
                        DINFO("load "NID_FORMAT", retry %u\n", NID_ARG(nid), retry);
                goto retry;
        }

        ret = msgqueue_push(&ent->msgqueue, msg, sizeof(*msg));
        if (ret < 0) {
                ret = -ret;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&__lock__);

        return 0;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_ret:
        return ret;
}

static int __chunk_cleanup_exist(const chkinfo_t *chkinfo, const nid_t *nid)
{
        int i;

        for (i = 0; i < chkinfo->repnum; i++) {
                if (nid_cmp(&chkinfo->diskid[i].id, nid) == 0) {
                        return 1;
                }
        }

        return 0;
}

void chunk_cleanup_compare(const char *pool, const fileid_t *parent,
                           const chkinfo_t *from, const chkinfo_t *to)
{
        int i;
        char tmp1[MAX_BUF_LEN], tmp2[MAX_BUF_LEN];
        const nid_t *nid;

        if (from->info_version == to->info_version) {
                return;
        }

        YASSERT(from->info_version < to->info_version);

        for (i = 0; i < from->repnum; i++) {
                nid = &from->diskid[i].id;

                if (__chunk_cleanup_exist(to, nid)) {
                        continue;
                }

                CHKINFO_STR(from, tmp1);
                CHKINFO_STR(to, tmp2);

                DBUG("update %s --> %s, remove chunk @ %s, parent "CHKID_FORMAT"\n",
                      tmp1, tmp2, network_rname(nid), CHKID_ARG(parent));
                chunk_cleanup_push(pool, parent, &from->id, nid, from->info_version);
        }
}

static uint32_t __key(const void *args)
{
        const nid_t *nid = args;
        return nid->id;
}

static int __cmp(const void *v1, const void *v2)
{
        const entry_t *ent = (entry_t *)v1;

        DBUG("cmp "NID_FORMAT" --> "NID_FORMAT"\n",
              NID_ARG(&ent->nid), NID_ARG((const nid_t *)v2));

        return nid_cmp(&ent->nid, (const nid_t *)v2);
}

static int __chunk_cleanup_close(const nid_t *nid)
{
        int ret;
        entry_t *ent;

        ret = sy_rwlock_wrlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = hash_table_remove(__chunk_cleanup__, (void *)nid, (void *)&ent);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        msgqueue_close(&ent->msgqueue);
        yfree((void **)&ent);

        sy_rwlock_unlock(&__lock__);

        return 0;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_ret:
        return ret;
}

static int __chunk_cleanup_chunk(const char *pool, const fileid_t *fileid,
                             const chkid_t *chkid, const nid_t *_nid, uint64_t meta_version)
{
        int ret, retry = 0;
        nid_t nid;

        if (chkid_isroot(fileid)) {
                DWARN("cleanup root chunk\n");
                ret = md_root_chunk_cleanup(pool, _nid, meta_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
        retry:
                ret = md_map_getsrv(fileid, &nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = stor_rpc_chunk_cleanup(&nid, fileid, chkid, _nid, meta_version);
                if (unlikely(ret)) {
                        if ((ret == EREMCHG) && retry == 0) {
                                md_map_drop(fileid, &nid);
                                retry = 1;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}


static int __chunk_cleanup_exec(const nid_t *nid, const char *buf, int len)
{
        int ret, i, count, retry = 0;
        msgqueue_msg_t *msg;
        const cleanup_msg_t *_msg;

        YASSERT(len % CHUNK_CLEANUP_MSG_SIZE == 0);

        count = len / CHUNK_CLEANUP_MSG_SIZE;
        for (i = 0; i < count; i++) {
                msg = (void *)buf + i * CHUNK_CLEANUP_MSG_SIZE;
                _msg = (void *)msg->buf;

#if 1
                if (msg->len != sizeof(*_msg)) {
                        DERROR("skip invalid data\n");
                        continue;
                }

                if (msg->crc != crc32_sum(msg->buf, msg->len)) {
                        DERROR("skip invalid data\n");
                        continue;
                }

#else
                YASSERT(msg->len == sizeof(*_msg));
                YASSERT(msg->crc == crc32_sum(msg->buf, msg->len));
#endif

#if ENABLE_CHUNK_DEBUG
                DINFO("chunk_cleanup %s, chunk "CHKID_FORMAT" @ %s, parent "CHKID_FORMAT", version %llu\n",
                      network_rname(nid), CHKID_ARG(&_msg->chkid), network_rname(nid),
                      CHKID_ARG(&_msg->parent), (LLU)_msg->meta_version);
#else
                DBUG("chunk_cleanup %s, chunk "CHKID_FORMAT" @ %s, parent "CHKID_FORMAT", version %llu\n",
                      network_rname(nid), CHKID_ARG(&_msg->chkid), network_rname(nid),
                      CHKID_ARG(&_msg->parent), (LLU)_msg->meta_version);
#endif

                retry = 0;
        retry:
                ret = __chunk_cleanup_chunk(_msg->pool, &_msg->parent, &_msg->chkid,
                                            nid, _msg->meta_version);
                if (unlikely(ret)) {
                        ret = _errno(ret);
                        if (ret == EAGAIN && retry < gloconf.lease_timeout * 2) {
                                if (retry > gloconf.lease_timeout) {
                                        DWARN("chunk_cleanup %s, chunk "CHKID_FORMAT"@ "
                                              CHKID_FORMAT", retry %u, %u\n",
                                              network_rname(nid), CHKID_ARG(&_msg->chkid),
                                              CHKID_ARG(&_msg->parent), retry, ret);
                                }
                                retry++;
                                sleep(1);
                                goto retry;
                        } else {
                                DWARN("chunk_cleanup %s, chunk "CHKID_FORMAT"@ "
                                       CHKID_FORMAT", idx %u, %u\n",
                                       network_rname(nid), CHKID_ARG(&_msg->chkid),
                                       CHKID_ARG(&_msg->parent), i, ret);
                                continue;
                        }
                }
        }

        return 0;
}

static int __chunk_cleanup(const nid_t *nid)
{
        int ret, len;
        entry_t *ent;
        char buf[MAX_BUF_LEN];

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk_cleanup "NID_FORMAT"\n", NID_ARG(nid));
#else
        DBUG("chunk_cleanup "NID_FORMAT"\n", NID_ARG(nid));
#endif

        ret = sy_rwlock_rdlock(&__lock__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __chunk_cleanup_find(nid, &ent);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        len = MAX_BUF_LEN;
        len = (len / CHUNK_CLEANUP_MSG_SIZE) * CHUNK_CLEANUP_MSG_SIZE;
        ret = msgqueue_pop(&ent->msgqueue, buf, len);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&__lock__);

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk_cleanup "NID_FORMAT", size %u\n", NID_ARG(nid), ret);
#else
        DBUG("chunk_cleanup "NID_FORMAT", size %u\n", NID_ARG(nid), ret);
#endif

        len = ret;
        if (len) {
                ret = __chunk_cleanup_exec(nid, buf, len);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = __chunk_cleanup_close(nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&__lock__);
err_ret:
        return ret;
}

static void *__chunk_cleanup_worker(void *arg)
{
        int ret;
        entry_t *ent;
        nid_t nid = *(nid_t *)arg;

        while (1) {
                if (!netable_connected(&nid)) {
                        DINFO("%s not online\n", network_rname(&nid));
                        break;
                }
        
                ret = __chunk_cleanup(&nid);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                /* ent not found, nothing todo */
                        } else {
                                ret = sy_rwlock_rdlock(&__lock__);
                                if (unlikely(ret))
                                        UNIMPLEMENTED(__DUMP__);

                                ret = __chunk_cleanup_find(&nid, &ent);
                                if (unlikely(ret))
                                        UNIMPLEMENTED(__DUMP__);

                                ent->running = 0;
                                sy_rwlock_unlock(&__lock__);
                        }

                        break;
                }
        }

        return NULL;
}

static int __chunk_cleanup_iterator(void *_ctx, void *_ent)
{
        int ret;
        entry_t *ent = _ent;

        (void) _ctx;

        if (ent->running)
                goto out;

        ret = network_connect1(&ent->nid);
        if (unlikely(ret))
                goto out;

        DINFO("start chunk_cleanup worker for "NID_FORMAT"\n", NID_ARG(&ent->nid));

        ent->running = 1;

        UNIMPLEMENTED(__NULL__);//not safe

        ret = sy_thread_create(__chunk_cleanup_worker, &ent->nid);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

out:
        return 0;
}

static int  __chunk_cleanup_worker__(void *arg)
{
        int ret;

        (void) arg;

        ret = sy_rwlock_wrlock(&__lock__);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        hash_iterate_table_entries(__chunk_cleanup__, __chunk_cleanup_iterator, NULL);

        sy_rwlock_unlock(&__lock__);

        ret = timer1_settime(&__chunk_cleanup_timer__, USEC_PER_SEC * 30);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
}

static worker_handler_t __chunk_cleanup_timer__;

static int __chunk_cleanup_init(void *node, void *_arg)
{
        int ret;
        nid_t *nid = node;

        (void)_arg;
        ret = __chunk_cleanup_create(nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __chunk_cleanup_commit_worker__(commit_worker_t *commit_worker)
{
        int ret;
        struct list_head list;
        struct list_head *pos, *n;
        msglist_t *ent;

        INIT_LIST_HEAD(&list);

        ret = sy_spin_lock(&commit_worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 提取到本地队列
        list_splice_init(&commit_worker->list, &list);

        sy_spin_unlock(&commit_worker->lock);

        list_for_each_safe(pos, n, &list) {
                ent = (msglist_t *)pos;
                
                ret = __chunk_cleanup_push(ent);

                list_del(pos);
                yfree((void **)&ent);
        }

        return 0;
err_ret:
        return ret;
}

STATIC void * __chunk_cleanup_commit_worker(void *_arg)
{
        int ret;
        commit_worker_t *commit_worker = _arg;

        while (1) {
                ret = _sem_wait(&commit_worker->sem);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __chunk_cleanup_commit_worker__(commit_worker);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

STATIC int __chunk_cleanup_commit_worker_init()
{
        int ret;
        commit_worker_t *commit_worker;

        ret = ymalloc((void **)&commit_worker, sizeof(*commit_worker));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        INIT_LIST_HEAD(&commit_worker->list);

        ret = sem_init(&commit_worker->sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = sy_spin_init(&commit_worker->lock);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = sy_thread_create2(__chunk_cleanup_commit_worker, commit_worker, "__chunk_cleanup_commit_worker");
        if (unlikely(ret))
                GOTO(err_free, ret);

        __commit_worker__ = commit_worker;

        return 0;
err_free:
        yfree((void **)&commit_worker);
err_ret:
        return ret;
}

/**
 * 线程维护提交队列，并持久化到磁盘文件
 * timer加载磁盘文件，并处理每条记录
 *
 * @return
 */
int chunk_cleanup_init()
{
        int ret;

        ret = sy_rwlock_init(&__lock__, "chunk_cleanup");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __chunk_cleanup__ = hash_create_table(__cmp, __key, "chunk_cleanup");
        if (__chunk_cleanup__ == NULL) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

#if 0
        DERROR("chunk cleanup service disabled\n");
        return 0;
#endif

        ret = timer1_create(&__chunk_cleanup_timer__, "chunk_cleanup",
                            __chunk_cleanup_worker__, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&__chunk_cleanup_timer__, USEC_PER_SEC * 30);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = __chunk_cleanup_commit_worker_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = cluster_listnode_iterator1(__chunk_cleanup_init, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
